#include <userver/utils/periodic_task.hpp>

#include <random>
#include <tuple>

#include <fmt/format.h>

#include <userver/engine/async.hpp>
#include <userver/engine/condition_variable.hpp>
#include <userver/engine/single_consumer_event.hpp>
#include <userver/engine/sleep.hpp>
#include <userver/engine/task/cancel.hpp>
#include <userver/engine/task/task_with_result.hpp>
#include <userver/logging/log.hpp>
#include <userver/rcu/rcu.hpp>
#include <userver/testsuite/periodic_task_control.hpp>
#include <userver/tracing/span.hpp>
#include <userver/tracing/tracer.hpp>
#include <userver/utils/rand.hpp>

USERVER_NAMESPACE_BEGIN

namespace utils {

namespace {

auto TieSettings(const PeriodicTask::Settings& settings) {
    // Can't use Boost.Pfr, because Settings has custom constructors.
    const auto& [f1, f2, f3, f4, f5, f6] = settings;
    return std::tie(f1, f2, f3, f4, f5, f6);
}

}  // namespace

class PeriodicTask::Impl final {
public:
    enum class SuspendState { kRunning, kSuspended };

    std::string name;
    std::atomic<bool> is_name_set{false};
    PeriodicTask::Callback callback;
    engine::TaskWithResult<void> task;
    rcu::Variable<PeriodicTask::Settings> settings;
    engine::SingleConsumerEvent changed_event;
    std::atomic<bool> should_force_step{false};
    std::optional<std::minstd_rand> mutate_period_random;

    // For kNow only
    engine::Mutex step_mutex;
    std::atomic<SuspendState> suspend_state;

    std::optional<USERVER_NAMESPACE::testsuite::PeriodicTaskRegistrationHolder> registration_holder;

    Impl()
        : settings(std::chrono::seconds(1)),
          suspend_state(SuspendState::kRunning)
    {}

    void DoStart();
    void Run();
    bool Step();
    bool StepDebug(bool preserve_span);
    bool DoStep();
    std::chrono::milliseconds MutatePeriod(std::chrono::milliseconds period);
    std::string_view GetName() const noexcept;
    void SuspendDebug();
    void ResumeDebug();
};

bool PeriodicTask::Settings::operator==(const Settings& other) const noexcept {
    return TieSettings(*this) == TieSettings(other);
}

bool PeriodicTask::Settings::operator!=(const Settings& other) const noexcept {
    return TieSettings(*this) != TieSettings(other);
}

PeriodicTask::PeriodicTask()
    : impl_()
{}

PeriodicTask::PeriodicTask(std::string name, Settings settings, Callback callback)
    : impl_()
{
    impl_->name = std::move(name);
    impl_->is_name_set = true;
    impl_->callback = std::move(callback);
    impl_->settings.Assign(std::move(settings));
    impl_->suspend_state = Impl::SuspendState::kRunning;
    impl_->DoStart();
}

PeriodicTask::~PeriodicTask() {
    impl_->registration_holder = std::nullopt;
    Stop();
}

void PeriodicTask::Start(std::string name, Settings settings, Callback callback) {
    UASSERT_MSG(!name.empty(), "Periodic task must have a name");
    auto settings_ptr = impl_->settings.StartWriteEmplace(std::move(settings));
    // Set name_ under the 'settings_' mutex.
    if (!impl_->is_name_set) {
        impl_->name = std::move(name);
        impl_->is_name_set = true;
    } else {
        UINVARIANT(
            impl_->name == name,
            fmt::format("PeriodicTask name must not be changed on the fly, old={}, new={}", impl_->name, name)
        );
    }
    // Stop here so that if the invariant above fails, the task is not affected.
    Stop();
    impl_->callback = std::move(callback);
    settings_ptr.Commit();
    impl_->DoStart();
}

void PeriodicTask::Impl::DoStart() {
    LOG_DEBUG() << "Starting PeriodicTask with name=" << GetName();
    auto settings_ptr = settings.Read();
    auto& task_processor =
        settings_ptr->task_processor ? *settings_ptr->task_processor : engine::current_task::GetTaskProcessor();
    task = engine::CriticalAsyncNoSpan(task_processor, &PeriodicTask::Impl::Run, this);
}

void PeriodicTask::Stop() noexcept {
    const auto name = impl_->GetName();
    try {
        if (IsRunning()) {
            LOG_INFO() << "Stopping PeriodicTask with name=" << name;
            impl_->task.SyncCancel();
            impl_->changed_event.Reset();
            impl_->should_force_step = false;
            impl_->task = engine::TaskWithResult<void>();
            LOG_INFO() << "Stopped PeriodicTask with name=" << name;
        }
    } catch (std::exception& e) {
        LOG_ERROR() << "Exception while stopping PeriodicTask with name=" << name << ": " << e;
    } catch (...) {
        LOG_ERROR() << "Exception while stopping PeriodicTask with name=" << name;
    }
}

void PeriodicTask::SetSettings(Settings settings) {
    bool should_notify_task{};
    {
        auto writer = impl_->settings.StartWrite();
        if (settings == *writer) {
            // Updating an RCU is slow, better to avoid it when possible.
            return;
        }
        settings.flags = writer->flags;
        should_notify_task = settings.period != writer->period || settings.exception_period != writer->exception_period;
        *writer = std::move(settings);
        writer.Commit();
    }

    if (should_notify_task) {
        LOG_DEBUG() << "periodic task settings have changed, signalling name=" << impl_->GetName();
        impl_->changed_event.Send();
    }
}

void PeriodicTask::ForceStepAsync() {
    impl_->should_force_step = true;
    impl_->changed_event.Send();
}

bool PeriodicTask::SynchronizeDebug(bool preserve_span) {
    if (!IsRunning()) {
        return false;
    }

    return impl_->StepDebug(preserve_span);
}

bool PeriodicTask::IsRunning() const { return impl_->task.IsValid(); }

void PeriodicTask::Impl::Run() {
    bool skip_step = false;
    {
        auto l_settings = settings.Read();
        if (!(l_settings->flags & Flags::kNow)) {
            skip_step = true;
        }
    }

    while (!engine::current_task::ShouldCancel()) {
        const auto before = std::chrono::steady_clock::now();
        bool no_exception = true;

        if (!std::exchange(skip_step, false)) {
            no_exception = Step();
        }

        const auto l_settings = settings.Read();
        auto period = l_settings->period;
        const auto exception_period = l_settings->exception_period.value_or(period);

        if (!no_exception) {
            period = exception_period;
        }

        std::chrono::steady_clock::time_point start;
        if (l_settings->flags & Flags::kStrong) {
            start = before;
        } else {
            start = std::chrono::steady_clock::now();
        }

        while (changed_event.WaitForEventUntil(start + MutatePeriod(period))) {
            if (should_force_step.exchange(false)) {
                break;
            }
            // The config variable value has been changed, reload
            const auto l_settings = settings.Read();
            period = l_settings->period;
            const auto exception_period = l_settings->exception_period.value_or(period);
            if (!no_exception) {
                period = exception_period;
            }
        }
    }
}

bool PeriodicTask::Impl::DoStep() {
    auto settings_ptr = settings.Read();
    const auto span_log_level = settings_ptr->span_level;
    const auto name = GetName();
    tracing::Span span(std::string{name});
    span.SetLogLevel(span_log_level);
    try {
        callback();
        return true;
    } catch (const std::exception& e) {
        LOG_ERROR() << "Exception in PeriodicTask with name=" << name << ": " << e;
        return false;
    }
}

bool PeriodicTask::Impl::Step() {
    const std::lock_guard lock_step(step_mutex);

    if (suspend_state.load() == SuspendState::kSuspended) {
        LOG_INFO() << "Skipping suspended PeriodicTask with name=" << GetName();
        return true;
    }

    return DoStep();
}

bool PeriodicTask::Impl::StepDebug(bool preserve_span) {
    const std::lock_guard lock_step(step_mutex);

    std::optional<tracing::Span> testsuite_oneshot_span;
    if (preserve_span) {
        tracing::Span span("periodic-synchronize-debug-call");
        testsuite_oneshot_span.emplace(std::move(span));
    }

    return DoStep();
}

std::chrono::milliseconds PeriodicTask::Impl::MutatePeriod(std::chrono::milliseconds period) {
    auto settings_ptr = settings.Read();
    if (!(settings_ptr->flags & Flags::kChaotic)) {
        return period;
    }

    if (!mutate_period_random) {
        mutate_period_random
            .emplace(utils::WithDefaultRandom(std::uniform_int_distribution<std::minstd_rand::result_type>{}));
    }

    const auto jitter = settings_ptr->distribution;
    std::uniform_int_distribution distribution{(period - jitter).count(), (period + jitter).count()};
    const auto ms = distribution(*mutate_period_random);
    return std::chrono::milliseconds(ms);
}

std::string_view PeriodicTask::Impl::GetName() const noexcept {
    return is_name_set ? std::string_view{name} : "<name not set>";
}

void PeriodicTask::SuspendDebug() { impl_->SuspendDebug(); }

void PeriodicTask::ResumeDebug() { impl_->ResumeDebug(); }

void PeriodicTask::Impl::SuspendDebug() {
    // step_mutex_ waits, for a potentially long time, for Step() call completion
    const std::lock_guard lock_step(step_mutex);
    auto prior_state = suspend_state.exchange(SuspendState::kSuspended);
    if (prior_state != SuspendState::kSuspended) {
        LOG_DEBUG() << "Periodic task " << GetName() << " suspended";
    }
}

void PeriodicTask::Impl::ResumeDebug() {
    auto prior_state = suspend_state.exchange(SuspendState::kRunning);
    if (prior_state != SuspendState::kRunning) {
        LOG_DEBUG() << "Periodic task " << GetName() << " resumed";
    }
}

void PeriodicTask::RegisterInTestsuite(testsuite::PeriodicTaskControl& periodic_task_control) {
    UINVARIANT(impl_->is_name_set, "PeriodicTask::RegisterInTestsuite should be called after Start");
    impl_->registration_holder.emplace(periodic_task_control, std::string{impl_->GetName()}, *this);
}

PeriodicTask::Settings PeriodicTask::GetCurrentSettings() const {
    auto settings_ptr = impl_->settings.Read();
    return *settings_ptr;
}

}  // namespace utils

USERVER_NAMESPACE_END
